rabbitmq 基于rabbitmq-delayed-message-exchange插件实现延迟队列

前言

在《rabbitmq 基于TTL和Dead Letter特性实现延迟队列》中,我们介绍了通过rabbitmq提供的TTL和Dead Letter特性来实现延迟队列的效果;今天我们将介绍另外一种方式来实现延迟队列 —— rabbitmq-delayed-message-exchange插件

插件安装

  1. 插件功能

    RabbitMQ Delayed Message Plugin 在rabbitmq中添加delayed-messaging(or scheduled-messaging)功能。用户可以通过x-delayed-message声明一个exchange,然后通过x-delayed-type属性指定exchange在rabbitmq中实际类型(有点类似设计模式中的装饰者模式,通过该插件对原有的exchange类型进行装饰,来动态加入新的功能,从而保证职责的清晰 ——即延迟功能)。然后在publish消息时,在消息header中设置x-delay值 ——消息延迟的时间(单位:毫秒)。在exchange中延迟x-delay时间之后,消息将会被投递给各自的队列,如果消息中没有设置x-delay header值,插件将不会延迟该消息而直接投递到各自队列中。

  2. 版本要求

    RabbitMQ Versions:RabbitMQ 3.5.8以上的版本;

    Erlang/OTP Versions:Erlang/OTP 18.0以上的版本;

  3. 插件成熟度

    官方说法:该插件是实验性的但很稳定,用户在了解其限制的情况下,可以运用到生产环境。在使用RabbitMQ Delayed Message Plugin情况下,会对原有的消息处理带来一定的性能影响,毕竟在原来消息转发的基础上了,多出了延迟功能逻辑处理。例如:对于经过“x-delayed-message” exchange的每个消息,插件都需要校验过期时间是否合法,以确保延迟范围在合法的时间范围内(即:Delay> 0,Delay = < ERL_MAX_T Erlang一个定时器可以设置为(2 ^ 32)-1毫秒)。如果消息参数设置合法,插件则将消息持久化到Mnesia中并调整当前调度的定时器等等。

    相关限制:

    1. 该插件不支持延迟消息的复制,在rabbimq镜像集群模式下,
    如果其中的一个节点宕机,会存在消息不可用,只能等该节点重新启动,才可以恢复;
    2. 目前该插件只支持在磁盘节点上使用,当前还不支持ram节点;
    3. 不适合具有大量延迟消息的情况(例如:数千或数百万的延迟消息);
  1. 安装过程

    # 下载插件
    cd /usr/local/rabbitmq-cluster/rabbitmq-node1/plugins
    wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

    # 启用插件
    cd /usr/local/rabbitmq-cluster/rabbitmq-node1/sbin
    ./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    # 禁用插件
    ./rabbitmq-plugins disable rabbitmq_delayed_message_exchange

    # 如果是rabbitmq集群,则依次到各node执行上述步骤即可!

    rabbitmq插件下载地址

  2. 遇到的问题

    在wget下载插件时,出现域名DNS无法解析,在阿里云主机上,多试几次就可以解析了,估计是之前DNS服务器缓存没有此解析记录,DNS解析超时导致。
    如果尝试多次还是无法解析,建议修改本地DNS服务器地址。例如:常见的公共DNS服务器(google的8.8.8.8,国内的114.114.114.114等),具体设置参见本地DNS服务器修改

    Resolving bintray.com... failed: Name or service not known.
    wget: unable to resolve host address “bintray.com”

具体使用

实现原理图:
image
RabbitMQ Delayed Message Plugin负责消息的延迟功能,rabbitmq内置的direct类型的exchange负责消息的转发功能。各模块职责清晰,典型的装饰者模式设计思想,让我想起了java I/O中(BufferedInputStream and FileInputStream) or (BufferedOutputStream and FileOutputStream)的设计。

// 声明一个exchange,例如:channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments)
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct"); //指定rabbitmq中实际类型(有四种值:direct,fanout,topic,headers)
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); //这里指定exchange的装饰类型为x-delayed-message

// producer publish delay-message
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000); //消息延迟5秒转发到各自队列
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

总结

从官方描述的插件成熟度,不难看出,只能简单的使用下,很难在生产环境使用。限制因素太多,要实现延迟队列的功能,个人建议还是选择通过rabbitmq的TTL和Dead Letter特性实现。而且该插件并不是rabbitmq官方推出的,rabbitmq官方并不会维护和优化该插件。

参考链接

  1. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  2. https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/